package day02.transform;

import beans.SensorReading;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Flink 流处理 API - DataStream 八大分区策略与自定义分区器
 * <p>
 * 参考博客：https://blog.csdn.net/wangpei1949/article/details/100631663
 *
 * @author lvbingbing
 * @date 2021-12-14 21:03
 */
public class FlinkTransform07 {
    public static void main(String[] args) throws Exception {
        // 1、创建FlinkTransform00对象，有参构造会初始化 env，并从文件中读取数据
        int parallelism = 4;
        FlinkTransform00 flinkTransform = new FlinkTransform00(parallelism);
        // 2、获取执行环境
        StreamExecutionEnvironment env = flinkTransform.getEnv();
        // 3、学习 DataStream 八大分区策略与自定义分区器
        studyPartition((SingleOutputStreamOperator<SensorReading>) flinkTransform.getSensorReadingStream());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * DataStream 八大分区策略与自定义分区器
     * <p>
     * ⚫ GlobalPartitioner：DataStream -> DataStream <br>
     * GLOBAL分区。将记录输出到下游Operator的第一个实例。<br>
     * Tips：对每条记录，只选择下游operator的第一个channel
     * <p>
     * ⚫ ShufflePartitioner：DataStream -> DataStream <br>
     * SHUFFLE分区。将记录随机输出到下游Operator的每个实例。<br>
     * Tips：对每条记录，随机算则下游operator的某个channel
     * <p>
     * ⚫ RebalancePartitioner：DataStream -> DataStream <br>
     * REBALANCE分区。将记录以循环的方式输出到下游Operator的每个实例。<br>
     * Tips：第一条记录，输出到下游的第一个channel；第二条记录，输出到下游的第二个channel...如此循环
     * <p>
     * ⚫ RescalePartitioner：DataStream -> DataStream <br>
     * RESCALE分区。基于上下游Operator的并行度，将记录以循环的方式输出到下游Operator的每个实例。<br>
     * 举例：上游并行度是2，下游是4，则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。<br>
     * 若上游并行度是4，下游并行度是2，则上游两个并行度将记录输出到下游一个并行度上；上游另两个并行度将记录输出到下游另一个并行度上。
     * <p>
     * ⚫ BroadcastPartitioner：DataStream -> DataStream <br>
     * BROADCAST分区。广播分区将上游数据集输出到下游Operator的每个实例中。适合于大数据集Join小数据集的场景。<br>
     * Tips：广播分区不支持选择channel，因为会输出到下游每个channel中
     * <p>
     * ⚫ ForwardPartitioner <br>
     * FORWARD分区。将记录输出到下游本地的operator实例。 <br>
     * ForwardPartitioner分区器要求上下游算子并行度一样。上下游Operator同属一个SubTasks。
     * <p>
     * ⚫ KeyGroupStreamPartitioner(HASH方式) <br>
     * HASH分区。将记录按Key的Hash值输出到下游Operator实例。
     * <p>
     * ⚫ CustomPartitionerWrapper <br>
     * CUSTOM分区。通过Partitioner实例的partition方法(自定义的)将记录输出到下游。
     *
     * @param sensorReadingStream <br>
     */
    private static void studyPartition(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        // 原始的dataStream
        sensorReadingStream.print("originStream");
        // 1、GlobalPartitioner
        sensorReadingStream.global()
                .print("globalPartitionStream");
        // 2、ShufflePartitioner
        sensorReadingStream.shuffle()
                .print("shufflePartitionStream");
        // 3、ReBalancePartitioner
        sensorReadingStream.rebalance()
                .print("reBalancePartitioner")
                .setParallelism(8);
        // 4、RescalePartitioner
        sensorReadingStream.rescale()
                .print("rescalePartitioner")
                .setParallelism(8);
        // 5、BroadcastPartitioner
        sensorReadingStream.broadcast()
                .print("broadcastPartitioner")
                .setParallelism(8);
        // 6、ForwardPartitioner
        sensorReadingStream.forward()
                .print("forwardPartitioner")
                .setParallelism(4);
        // 7、KeyGroupStreamPartitioner(HASH方式)
        sensorReadingStream.keyBy(SensorReading::getId)
                .print("keyGroupStreamPartitioner")
                .setParallelism(8);
        // 8、CustomPartitionerWrapper
        sensorReadingStream.partitionCustom(new Partitioner<String>() {
                    @Override
                    public int partition(String sensorId, int numPartitions) {
                        switch (sensorId) {
                            case "sensor_1":
                                return 1;
                            case "sensor_2":
                                return 2;
                            case "sensor_3":
                                return 3;
                            case "sensor_4":
                                return 4;
                            default:
                                return 0;
                        }
                    }
                }, "id")
                .print("customPartitioner");
    }
}